package realtime.app.zyjdwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

public class ZyjDwdTrafficUserJumpDetail {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> pageLog = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_traffic_page_log", "uj-page-app"));
        pageLog.print("pagelog>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
        SingleOutputStreamOperator<JSONObject> mappedStream = pageLog.flatMap(
                new FlatMapFunction<String, JSONObject>() {
                    @Override
                    public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                        try {
                            JSONObject jsonObj = JSON.parseObject(value);
                            out.collect(jsonObj);
                        } catch (Exception e) {
                            System.out.println("脏数据:"+value);
                        }
                    }
                }
        );
        SingleOutputStreamOperator<JSONObject> ts = mappedStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        return element.getLong("ts");
                    }
                }));
        KeyedStream<JSONObject, String> keyedStream = ts.keyBy(data -> data.getJSONObject("common").getString("mid"));
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start")
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        String string = value.getJSONObject("page").getString("last_page_id");
                        return string == null;
                    }
                })
                .next("next")
                .where(new SimpleCondition<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        String string = value.getJSONObject("page").getString("last_page_id");
                        return string == null;
                    }
                }).within(Time.seconds(10));


        PatternStream<JSONObject> pattern1 = CEP.pattern(keyedStream, pattern);
        OutputTag<String> cs = new OutputTag<String>("chaoshi") {
        };
        SingleOutputStreamOperator<String> streamOperator = pattern1.flatSelect(cs, new PatternFlatTimeoutFunction<JSONObject, String>() {
            @Override
            public void timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
                JSONObject frist = pattern.get("start").get(0);
                out.collect(frist.toJSONString());
            }
        }, new PatternFlatSelectFunction<JSONObject, String>() {
            @Override
            public void flatSelect(Map<String, List<JSONObject>> pattern, Collector<String> out) throws Exception {
                JSONObject frist = pattern.get("start").get(0);
                out.collect(frist.toJSONString());
            }
        });
        DataStream<String> sideOutput = streamOperator.getSideOutput(cs);
        DataStream<String> union = streamOperator.union(sideOutput);
        union.print("all>>>>>>>>>>>>>>>>>>>>>>>>");


        env.execute();
    }

}
